package com.data.mall.etl.consumer;

import com.alibaba.fastjson.JSON;
import com.data.mall.common.KafkaTrackingDTO;
import com.data.mall.etl.domain.enums.Event;
import com.data.mall.etl.service.ItemEventETL;
import com.data.mall.etl.service.PromotionEventETL;
import com.data.mall.etl.service.UserEventETL;
import com.data.mall.utils.Either;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.DependsOn;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;

import javax.annotation.PostConstruct;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Function;

/**
 * @author shixukai
 * @Package com.data.mall.common
 * @Description: TODO
 * @date 2021/4/29
 */
@Slf4j
@Component
@Service("eventTrackingConsumerContext")
@DependsOn("dynamicBeanManager")
public class KafkaTrackingConsumer {

    public final static Map<Event, List<Function<KafkaTrackingDTO,Object>>> eventListConcurrentMap = Maps.newConcurrentMap();

    @Autowired
    UserEventETL userEventETL;

    @Autowired
    ItemEventETL itemEventETL;

    @Autowired
    PromotionEventETL promotionEventETL;

    // 消费监听
    @KafkaListener(id= "tracking1" ,topics = {"tracking"}, groupId = "tracking")
    public void onMessageConsumer(ConsumerRecord<?, ?> record) {
        // 消费的哪个topic、partition的消息,打印出消息内容
        log.info("简单消费：" + record.topic() + "-" + record.partition() + "-" + record.value());
        String message = (String) record.value();
        try{
            KafkaTrackingDTO dto = JSON.parseObject(message,KafkaTrackingDTO.class);
            if(!StringUtils.isEmpty(dto.getEvent())){
                Optional.ofNullable(eventListConcurrentMap.get(dto.getEvent())).ifPresent(o -> o.stream().filter(Objects::nonNull).map(f -> Either.lift(f)).
                        forEach(f -> eventApply(f, dto)));
            }
        }catch (RuntimeException e){
            log.error("埋点数据消费错误",e);
        }
    }

    /**
     * 初始化事件管理器
     */
    @PostConstruct
    public void post(){
        // 用户浏览事件
        registerEvent(Event.USER_VIEW, o -> userEventETL.userViewEvent(o));
        // 用户下单事件
        registerEvent(Event.USER_ORDER, o -> userEventETL.userOrderEvent(o));
        // 用户注册
        registerEvent(Event.USER_REG, o -> userEventETL.userReg(o));
        //用户登录
        registerEvent(Event.USER_SIGN,o -> userEventETL.userLoginEvent(o));
        // 商品创建
        registerEvent(Event.CREATE_ITEM, o -> itemEventETL.createItem(o));
        // 活动上线
        registerEvent(Event.PUBLISH_PROMOTION, o -> promotionEventETL.publishPromotion(o));
    }

    /**
     * 注册事件管理器
     * @param event
     * @param eventHandler
     */
    public synchronized static void registerEvent(Event event, Function<KafkaTrackingDTO,Object> eventHandler){
        putIfSent(event);
        eventListConcurrentMap.get(event).add(eventHandler);
    }

    public static void putIfSent(Event event) {
        if (!eventListConcurrentMap.containsKey(event)) {
            eventListConcurrentMap.put(event, Lists.newArrayList());
        }
    }

    public Object eventApply(Function<KafkaTrackingDTO, Either> eventHandler, KafkaTrackingDTO message) {
        try {
            return eventHandler.apply(message);
        } catch (RuntimeException e) {
            log.error(e.getMessage(), e);
            return null;
        }
    }
}